package com.coffee.cmt.test;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author songkui
 * @since 2024/6/20 10:17
 */
public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        // 设置你的执行环境，任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 设定数据来源
        DataStream<AccountTransation> transactions = env
                .addSource(new RandomNumberAccountTransactionSource())
                .name("transactions");

        // 处理数据来源
        DataStream<AccountTransation> alerts = transactions
                .keyBy(AccountTransation::getAccountId)
                .process(new FraudDetector()) // 这里这个对象才是处理数据来源的
                .name("fraud-detector");

        // sink就是最后的结果()
        alerts
                .addSink(new AccountAlertSink())
                .name("send-alerts");

        // 最后开始执行
        env.execute("Fraud Detection");
    }

}
